---
title: prefect-ray
description: Accelerate your workflows by running tasks in parallel with Ray
---

[Ray](https://docs.ray.io/en/latest/index.html) can run your tasks in parallel by distributing them over multiple machines.
The `prefect-ray` integration makes it easy to accelerate your flow runs with Ray.

## Install `prefect-ray`

The following command will install a version of `prefect-ray` compatible with your installed version of `prefect`.
If you don't already have `prefect` installed, it will install the newest version of `prefect` as well.

```bash
pip install "prefect[ray]"
```

Upgrade to the latest versions of `prefect` and `prefect-ray`:

```bash
pip install -U "prefect[ray]"
```

<Warning>
**Ray limitations**

There are a few limitations with Ray:

- Ray has [experimental](https://docs.ray.io/en/latest/ray-overview/installation.html#install-nightlies) support for Python 3.13, but Prefect [does _not_ currently support](https://github.com/PrefectHQ/prefect/issues/16910) Python 3.13.
- Ray support for non-x86/64 architectures such as ARM/M1 processors with installation from `pip` alone and will be skipped during installation of Prefect. It is possible to manually install the blocking component with `conda`. See the [Ray documentation](https://docs.ray.io/en/latest/ray-overview/installation.html#m1-mac-apple-silicon-support) for instructions.
- Ray support for Windows is currently in beta.

See the [Ray installation documentation](https://docs.ray.io/en/latest/ray-overview/installation.html) for further compatibility information.
</Warning>

## Run tasks on Ray

The `RayTaskRunner` is a [Prefect task runner](https://docs.prefect.io/develop/task-runners/) that submits tasks to [Ray](https://www.ray.io/) for parallel execution.
By default, a temporary Ray instance is created for the duration of the flow run.
For example, this flow counts to three in parallel:

```python
import time

from prefect import flow, task
from prefect_ray import RayTaskRunner

@task
def shout(number):
    time.sleep(0.5)
    print(f"#{number}")

@flow(task_runner=RayTaskRunner)
def count_to(highest_number):
    shout.map(range(highest_number)).wait()

if __name__ == "__main__":
    count_to(10)

# outputs
#3
#7
#2
#6
#4
#0
#1
#5
#8
#9
```

If you already have a Ray instance running, you can provide the connection URL via an `address` argument.

To configure your flow to use the `RayTaskRunner`:

1. Make sure the `prefect-ray` collection is installed as described earlier: `pip install prefect-ray`.
2. In your flow code, import `RayTaskRunner` from `prefect_ray.task_runners`.
3. Assign it as the task runner when the flow is defined using the `task_runner=RayTaskRunner` argument.

For example, this flow uses the `RayTaskRunner` with a local, temporary Ray instance created by Prefect at flow run time.

```python
from prefect import flow
from prefect_ray.task_runners import RayTaskRunner

@flow(task_runner=RayTaskRunner())
def my_flow():
    ...
```

This flow uses the `RayTaskRunner` configured to access an existing Ray instance at `ray://<head_node_host>:10001`.

```python
from prefect import flow
from prefect_ray.task_runners import RayTaskRunner

@flow(
    task_runner=RayTaskRunner(
        address="ray://<head_node_host>:10001",
        init_kwargs={"runtime_env": {"pip": ["prefect-ray"]}},
    )
)
def my_flow():
    ...
```

`RayTaskRunner` accepts the following optional parameters:

| Parameter | Description |
| --- | --- |
| address | Address of a currently running Ray instance, starting with the [ray://](https://docs.ray.io/en/master/cluster/ray-client.html) URI. |
| init_kwargs | Additional kwargs to use when calling `ray.init`. |

The Ray client uses the [ray://](https://docs.ray.io/en/master/cluster/ray-client.html) URI to indicate the address of a Ray instance.
If you don't provide the `address` of a Ray instance, Prefect creates a temporary instance automatically.

## Run tasks on a remote Ray cluster

When using the `RayTaskRunner` with a remote Ray cluster, you may run into issues that are not seen when using a local Ray instance.
To resolve these issues, we recommend taking the following steps when working with a remote Ray cluster:

1. By default, Prefect will not persist any data to the filesystem of the remote ray worker. However, if you want to take advantage of Prefect's caching ability, you will need to configure a remote result storage to persist results across task runs.

We recommend using the [Prefect UI to configure a storage block](https://docs.prefect.io/develop/blocks/) to use for remote results storage.

Here's an example of a flow that uses caching and remote result storage:

```python
from typing import List

from prefect import flow, task
from prefect.logging import get_run_logger
from prefect.tasks import task_input_hash
from prefect_aws import S3Bucket
from prefect_ray.task_runners import RayTaskRunner


# The result of this task will be cached in the configured result storage
@task(cache_key_fn=task_input_hash)
def say_hello(name: str) -> None:
    logger = get_run_logger()
    # This log statement will print only on the first run. Subsequent runs will be cached.
    logger.info(f"hello {name}!")
    return name


@flow(
    task_runner=RayTaskRunner(
        address="ray://<instance_public_ip_address>:10001",
    ),
    # Using an S3 block that has already been created via the Prefect UI
    result_storage="s3/my-result-storage",
)
def greetings(names: List[str]) -> None:
    say_hello.map(names).wait()


if __name__ == "__main__":
    greetings(["arthur", "trillian", "ford", "marvin"])
```

2. If you get an error stating that the module 'prefect' cannot be found, ensure `prefect` is installed on the remote cluster, with:

```bash
pip install prefect
```

3. If you get an error with a message similar to "File system created with scheme 's3' could not be created", ensure the required Python modules are installed on **both local and remote machines**. For example, if using S3 for storage:

```bash
pip install s3fs
```

4. If you are seeing timeout or other connection errors, double check the address provided to the `RayTaskRunner`. The address should look similar to: `address='ray://<head_node_ip_address>:10001'`:

```bash
RayTaskRunner(address="ray://1.23.199.255:10001")
```

## Specify remote options

The `remote_options` context can be used to control the task's remote options. For example, we can set the number of CPUs and GPUs to use for the `process` task:

```python
from prefect import flow, task
from prefect_ray.task_runners import RayTaskRunner
from prefect_ray.context import remote_options


@task
def process(x):
    return x + 1


@flow(task_runner=RayTaskRunner())
def my_flow():
    # equivalent to setting @ray.remote(num_cpus=4, num_gpus=2)
    with remote_options(num_cpus=4, num_gpus=2):
        process.submit(42).wait()
```

## Resources

Refer to the `prefect-ray` [SDK documentation](https://reference.prefect.io/prefect_ray/) to explore all the capabilities of the `prefect-ray` library.

For further assistance using Ray, consult the [Ray documentation](https://docs.ray.io/en/latest/index.html).
